Storm学习笔记-第四章 Storm编程

ISpout

概述

核心接口(interface),负责将数据发送到topology中去处理
Storm会跟踪Spout发出去的tuple的DAG
ack/fail
tuple: message id
ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面

核心方法

open: 初始化操作
close: 资源释放操作
nextTuple: 发送数据 core api
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败

实现类

1
2
3
4
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent
DRPCSpout
ShellSpout

IComponent接口

概述:

public interface IComponent extends Serializable
为topology中所有可能的组件提供公用的方法

void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明当前Spout/Bolt发送的tuple的名称
使用OutputFieldsDeclarer配合使用

实现类:

1
public abstract class BaseComponent implements IComponent

IBolt接口

概述

职责:接收tuple处理,并进行相应的处理(filter/join/….)
hold住tuple再处理
IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行
nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理

方法

prepare:初始化
execute:处理一个tuple数据,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作

实现类:

1
2
3
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent
RichShellBolt

求和案例

需求:1 + 2 + 3 + …. = ???
实现方案:

Spout发送数字作为input
使用Bolt来处理业务逻辑:求和
将结果输出到控制台

拓扑设计: DataSourceSpout –> SumBolt

cn.northpark.bigdata.storm;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
/**
* 使用storm 实现累积求和
* @author w_zhangyang
*
*/
public class LocalSumStormTopology {
/**
* Spout需要继承BaseRishSpout
* 数据源需要产生数据并发射
*
* @author w_zhangyang
*
*/
public static class DataSourceSpout extends BaseRichSpout{

private SpoutOutputCollector collector;
/**
* 初始化方法,只会被调用一次
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
int number = 0;
/**
* 会产生数据,在生产上肯定是从消息队列中获取数据
*
* 这个方法是一个死循环,会一直不停的执行
*/
@Override
public void nextTuple() {
// TODO Auto-generated method stub

this.collector.emit(new Values(number++));

System.out.println("spout:"+number);

//防止数据产生太快
Utils.sleep(1000);
}
/**
*
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("num"));
}

}

/**
* @author w_zhangyang
* 数据的累积求和Bolt:接收数据并处理
*/
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法,会被执行一次
* @param stormConf
* @param context
* @param collector
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub

}
int sum = 0;
@Override
public void execute(Tuple input) {
// Bolt中获取值可以根据index获取,也可以根据上一个环节中定义的field的名称获取(建议使用该方式)
Integer integerByField = input.getIntegerByField("num");
sum+=integerByField;
System.out.println("bolt sum =["+sum+"]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}

}


public static void main(String[] args) {
// TopologyBuilder根据Spout和Bolt来构建出Topology
// Storm中任何一个作业都是通过Topology的方式进行提交的
// Topology中需要指定Spout和Bolt的执行顺序
// TopologyBuilder builder = new TopologyBuilder();
// builder.setSpout("DataSourceSpout", new DataSourceSpout());
// builder.setBolt("SumBolt", new SumBolt())
// .shuffleGrouping("DataSourceSpout");
//
// // 创建一个本地Storm集群:本地模式运行,不需要搭建Storm集群
// LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("LocalSumStormTopology", new Config(), builder.createTopology());

int number = 0;
int x = number++;
System.out.println(x);
System.out.println(number);

}
}

词频统计

需求:读取指定目录的数据,并实现单词计数功能
实现方案:

Spout来读取指定目录的数据,作为后续Bolt处理的input
使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
使用一个Bolt来进行最终的单词的次数统计操作
并输出

拓扑设计: DataSourceSpout ==> SplitBolt ==> CountBolt

cn.northpark.bigdata.storm;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
/**
* @author w_zhangyang
* 字数统计拓扑设计
*/
public class BruceWordCountTopology {
public static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
/**
* 业务:
* 1) 读取指定目录的文件夹下的数据:C:\\Users\\w_zhangyang\\Documents
* 2) 把每一行数据发射出去
*/
@Override
public void nextTuple() {
// TODO Auto-generated method stub
// 获取所有文件
Collection<File> files = FileUtils.listFiles(new File("C:\\Users\\w_zhangyang\\Documents"),
new String[]{"txt"},true);

for (File file : files) {
try {
List<String> readLines = FileUtils.readLines(file, "utf-8");
for (String line : readLines) {
// 发射出去
this.collector.emit(new Values(line));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 数据处理完之后,改名,否则一直重复执行
try {
FileUtils.moveFile(file, new File(file.getAbsolutePath() + System.currentTimeMillis()));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}

}


/**
*
* 对数据进行分割
*/
public static class SplitBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}
/**
* 业务逻辑:
* line: 对line进行分割,按照逗号
*/
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String line = input.getStringByField("line");
List<String> splitToList = Splitter.onPattern("[!|。|,|“|”|;]").omitEmptyStrings().splitToList(line);
for (String word : splitToList) {

for (int i = 0; i < word.length(); i++) {

String word2 = String.valueOf(word.charAt(i));
this.collector.emit(new Values(word2));
}
}


}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("word"));
}

}

/**
* 词频汇总Bolt
*/
public static class CountBolt extends BaseRichBolt {

private Map<String,Integer> countMap = Maps.newConcurrentMap();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
}
/**
* 业务逻辑:
* 1)获取每个单词
* 2)对所有单词进行汇总
* 3)输出
*/
@Override
public void execute(Tuple input) {
// 1)获取每个单词
// TODO Auto-generated method stub
String word = input.getStringByField("word");
Integer num = countMap.get(word);
if(num==null) {
num = 0;
}
num++;
// 2)对所有单词进行汇总
countMap.put(word, num);
// 3)输出
System.out.println("~~~~~~~~~~~~~~~~~~~~~~");
countMap.forEach((k,v)->{
System.out.println(k+":["+v+"]");
});
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}

}

public static void main(String[] args) {


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
StormTopology createTopology = builder.createTopology();

LocalCluster locaCluster = new LocalCluster();
locaCluster.submitTopology("BruceWordCountTopology", new Config(), createTopology);


}
}

Storm编程注意事项

1) Exception in thread “main” java.lang.IllegalArgumentException: Spout has already been declared for id DataSourceSpout
【不能bolt和spout命名一样】
2) org.apache.storm.generated.InvalidTopologyException: null
【还不能以双下划线开头命名】
3) Topology的名称不是重复: local似乎没问题, 等我们到集群测试的时候再来验证这个问题
【同时启用同一个Topology】

生活不止苟且,还有我喜爱的海岸.